package com.atguigu.chapter11;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.OverWindow;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.*;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/6/19 9:26
 */
public class Flink09_Table_Window_Over {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<WaterSensor> s1 = env
            .fromElements(new WaterSensor("sensor_1", 1000L, 10),
                          new WaterSensor("sensor_1", 3000L, 20),
                          new WaterSensor("sensor_1", 3000L, 40),
                          new WaterSensor("sensor_1", 5000L, 50)
                //                          new WaterSensor("sensor_2", 3000L, 30),
                //                          new WaterSensor("sensor_2", 6000L, 60)
            )
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((ws, ts) -> ws.getTs())
            );
        
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        Table table = tEnv.fromDataStream(s1, $("id"), $("ts").rowtime(), $("vc"));
        
        //        OverWindow w = Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w");
        //        OverWindow w = Over.partitionBy($("id")).orderBy($("ts")).preceding(rowInterval(1L)).as("w");
//        OverWindow w = Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w");
        OverWindow w = Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(2).second()).as("w");
        
        table
            .window(w)
            .select($("id"), $("ts"), $("vc"), $("vc").sum().over($("w")))
            .execute()
            .print();
        
    }
}
